Hudi on Spark

适用于版本0.10.1。

1 配置

  • 支持Spark 2.4.3+版本。
  • 0.9.0已添加Spark DML支持,但处于实验中。
  • 对于Spark 3支持情况
Hudi Supported Spark 3 version
0.10.0 3.1.x (default build), 3.0.x
0.7.0 - 0.9.0 3.0.x
0.6.0 and prior not supported
  • Spark SQL

使用HoodieSparkSessionExtension支持数据读写

需要注意添加spark-avro支持和保持版本一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Spark SQL for spark 3.1
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 3.0
spark-sql --packages org.apache.hudi:hudi-spark3.0.3-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.0.3 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 2 with scala 2.11
spark-sql --packages org.apache.hudi:hudi-spark-bundle_2.11:0.10.1,org.apache.spark:spark-avro_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 2 with scala 2.12
spark-sql \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
  • Spark DataSource

    数据生成器详见DataGenerator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // spark-shell
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator

2 Spark SQL类型支持

Spark Hudi Notes
boolean boolean
byte int *
short int *
integer int
long long
date date
timestamp timestamp
float float
double double
string string
decimal decimal
binary bytes *
array array
map map
struct struct
char not supported
varchar not supported
numeric not supported
null not supported
object not supported

3 建表

使用DataSource API将自动创建表。

使用Spark SQL需要显式创建表,需要以下参数:

  • 表类型

    cow或mor

  • 是否分区

    通过是否使用partition by语句判定

  • 是否外部表

    通过使用location或create external table指定外部表,否则创建内部表。详见内部表和外部表

注意:

  • 从0.10.0开始必须设置主键,包括之前版本创建的表。默认使用uuid作为主键。
  • primaryKey、preCombineField和type区分大小写。
  • 优先使用tblproperties设置,而不是options。
  • Spark SQL创建的表会默认设置hoodie.table.keygenerator.class为org.apache.hudi.keygen.ComplexKeyGenerator,设置hoodie.datasource.write.hive_style_partitioning为true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
-- create a cow table, with default primaryKey 'uuid' and without preCombineField provided
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;


-- create a mor non-partitioned table without preCombineField provided
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);

-- create a partitioned, preCombineField-provided cow table
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

-- create an external hudi table based on an existing path

-- for non-partitioned table
create table hudi_existing_tbl0 using hudi
location 'file:///tmp/hudi/dataframe_hudi_nonpt_table';

-- for partitioned table
create table hudi_existing_tbl1 using hudi
partitioned by (dt, hh)
location 'file:///tmp/hudi/dataframe_hudi_pt_table';

-- CTAS使用bulk_insert插入数据
-- CTAS: create a non-partitioned cow table without preCombineField
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;

-- CTAS: create a partitioned, preCombineField-provided cow table
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

-- create managed parquet table
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

-- CTAS by loading data into hudi table
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;

关键配置

Parameter Name Default Introduction
primaryKey uuid The primary key names of the table, multiple fields separated by commas. Same as hoodie.datasource.write.recordkey.field
preCombineField The pre-combine field of the table. Same as hoodie.datasource.write.precombine.field
type cow The table type to create. type = ‘cow’ means a COPY-ON-WRITE table, while type = ‘mor’ means a MERGE-ON-READ table. Same as hoodie.datasource.write.table.type

4 插入数据

  • Scala
1
2
3
4
5
6
7
8
9
10
11
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
  • Spark SQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- insert into non-partitioned table
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

-- insert dynamic partition
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;

-- insert static partition
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

-- 如果设置了preCombineField,将使用upsert插入
-- upsert mode for preCombineField-provided table
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001

-- 支持使用bulk_insert写入,但需要设置以下两个参数
-- bulk_insert mode for preCombineField-provided table
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
1 a1_2 20.0 1002

5 查询数据

(1) Scala

加载数据

1
2
3
4
5
6
7
8
9
10
// spark-shell
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

时间旅行查询(>=0.9.0)

三种时间格式查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spark.read.
format("hudi").
option("as.of.instant", "20210728141108100").
load(basePath)

spark.read.
format("hudi").
option("as.of.instant", "2021-07-28 14:11:08.200").
load(basePath)

// It is equal to "as.of.instant = 2021-07-28 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2021-07-28").
load(basePath)

注意

从版本0.9.0开始支持内建文件索引HoodieFileIndex。可用于分区裁剪和元数据查询。

支持非全局路径?

(2) Spark SQL

1
select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

6 更新数据

(1) Scala

1
2
3
4
5
6
7
8
9
10
11
// spark-shell
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

(2) Spark SQL

1) 更新

需要设置preCombineField

1
2
3
4
5
6
-- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

-- 示例
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;

2) MergeInto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
-- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]

<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

-- 示例
-- source table using hudi for testing merging into non-partitioned table
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;

-- source table using parquet for testing merging into partitioned table
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');

merge into hudi_cow_pt_tbl as target
using (
select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

7 增量查询

查询从某一时间点起的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// spark-shell
// reload data
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in

// incrementally query data
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

8 Point in time查询

查询某个时间段的数据

1
2
3
4
5
6
7
8
9
10
11
12
// spark-shell
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in

//incrementally query data
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

9 删除数据

  • Scala

    删除操作只支持Append模式。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // spark-shell
    // fetch total records count
    spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
    // fetch two records to be deleted
    val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

    // issue deletes
    val deletes = dataGen.generateDeletes(ds.collectAsList())
    val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

    df.write.format("hudi").
    options(getQuickstartWriteConfigs).
    option(OPERATION_OPT_KEY,"delete").
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, tableName).
    mode(Append).
    save(basePath)

    // run the same read query as above.
    val roAfterDeleteViewDF = spark.
    read.
    format("hudi").
    load(basePath)

    roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
    // fetch should return (total - 2) records
    spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
  • Spark SQL

    1
    2
    3
    4
    5
    6
    -- 语法
    DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

    -- 示例
    delete from hudi_cow_nonpcf_tbl where uuid = 1;
    delete from hudi_mor_tbl where id % 2 = 0;

10 插入覆盖

相比upsert,更适用于批量作业。因为增量更新,跳过索引、预聚合和再分区步骤。

  • Scala

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // spark-shell
    spark.
    read.format("hudi").
    load(basePath).
    select("uuid","partitionpath").
    sort("partitionpath","uuid").
    show(100, false)

    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.
    read.json(spark.sparkContext.parallelize(inserts, 2)).
    filter("partitionpath = 'americas/united_states/san_francisco'")
    df.write.format("hudi").
    options(getQuickstartWriteConfigs).
    option(OPERATION.key(),"insert_overwrite").
    option(PRECOMBINE_FIELD.key(), "ts").
    option(RECORDKEY_FIELD.key(), "uuid").
    option(PARTITIONPATH_FIELD.key(), "partitionpath").
    option(TBL_NAME.key(), tableName).
    mode(Append).
    save(basePath)

    // Should have different keys now for San Francisco alone, from query before.
    spark.
    read.format("hudi").
    load(basePath).
    select("uuid","partitionpath").
    sort("partitionpath","uuid").
    show(100, false)
  • Spark SQL

    分区表使用insert overwrite, 非分区表使用insert overwrite table。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    -- insert overwrite non-partitioned table
    insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
    insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

    -- insert overwrite partitioned table with dynamic partition
    insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';

    -- insert overwrite partitioned table with static partition
    insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

11 其他命令

(1) 修改表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 语法
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')


-- 示例
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;

--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);

--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;

--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');

(2) 分区命令

当前show partitions命令基于文件路径展示,当删除分区或整个分区内数据后将不再准确。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 语法
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )

-- Show Partitions
SHOW PARTITIONS tableIdentifier


-- 示例
--show partition:
show partitions hudi_cow_pt_tbl;

--drop partition:
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');

12 其他指引

参考资料